
from pyspark import *
# ModuleNotFoundError: No module named 'defs_4'
from defs_4 import *

if __name__ == '__main__':
    conf = SparkConf().setAppName("1_mapValues").setMaster("yarn")
    """
    如果提交到集群运行，除了主代码以外，还依赖了其他的代码文件
    需要设置一个参数，来告知spark，还有依赖文件需要同步上传到集群中
    参数叫做：spark.submit.pyFiles
    参数的值可以是单个.py文件，也可以是.zip压缩包
    """
    conf.set("spark.submit.pyFiles","defs_4.py")
    sc = SparkContext(conf=conf)
    # path=os.path.dirname(os.getcwd()) + "/data/input/order.text"
    path="/updown/input/order.text"
    print(path)
    file_rdd=sc.textFile(path)
    result=file_rdd.flatMap(lambda line:line.split("|"))\
        .map(lambda js:json.loads(js))\
        .filter(lambda js:js["areaName"] == '北京')\
        .map(city_with_category)\
        .distinct()
    # print(type(json.loads("{'key1':1,'key2':2}"))) 错误
    # print(type(json.loads('{"key1":1,"key2":2}'))) 错误
    print(result.collect())

    sc.stop()








